/*
Copyright 2022 The Karmada Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package store

import (
	"context"
	"fmt"
	"math"
	"sort"
	"strings"
	"sync"
	"time"

	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/apiserver/pkg/endpoints/request"
	"k8s.io/client-go/dynamic"
	"k8s.io/klog/v2"
	utiltrace "k8s.io/utils/trace"
)

// Store is the cache for resources from multiple member clusters
type Store interface {
	UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace) error
	HasResource(resource schema.GroupVersionResource) bool
	GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error)
	Stop()

	Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error)
	List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error)
	Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error)
}

// MultiClusterCache caches resource from multi member clusters
type MultiClusterCache struct {
	lock            sync.RWMutex
	cache           map[string]*clusterCache
	cachedResources map[schema.GroupVersionResource]struct{}
	restMapper      meta.RESTMapper
	// newClientFunc returns a dynamic client for member cluster apiserver
	newClientFunc func(string) (dynamic.Interface, error)
}

var _ Store = &MultiClusterCache{}

// NewMultiClusterCache return a cache for resources from member clusters
func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error), restMapper meta.RESTMapper) *MultiClusterCache {
	return &MultiClusterCache{
		restMapper:      restMapper,
		newClientFunc:   newClientFunc,
		cache:           map[string]*clusterCache{},
		cachedResources: map[schema.GroupVersionResource]struct{}{},
	}
}

// UpdateCache update cache for multi clusters
func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace) error {
	if klog.V(3).Enabled() {
		start := time.Now()
		defer func() {
			klog.Infof("MultiClusterCache update cache takes %v", time.Since(start))
		}()
	}

	c.lock.Lock()
	defer c.lock.Unlock()

	// remove non-exist clusters
	for clusterName := range c.cache {
		if _, exist := resourcesByCluster[clusterName]; !exist {
			klog.Infof("Remove cache for cluster %s", clusterName)
			c.cache[clusterName].stop()
			delete(c.cache, clusterName)
		}
	}

	// add/update cluster cache
	for clusterName, resources := range resourcesByCluster {
		cache, exist := c.cache[clusterName]
		if !exist {
			klog.Infof("Add cache for cluster %v", clusterName)
			cache = newClusterCache(clusterName, c.clientForClusterFunc(clusterName), c.restMapper)
			c.cache[clusterName] = cache
		}
		err := cache.updateCache(resources)
		if err != nil {
			return err
		}
	}

	// update cachedResource
	newCachedResources := make(map[schema.GroupVersionResource]struct{}, len(c.cachedResources))
	for _, resources := range resourcesByCluster {
		for resource := range resources {
			newCachedResources[resource] = struct{}{}
		}
	}
	for resource := range c.cachedResources {
		if _, exist := newCachedResources[resource]; !exist {
			delete(c.cachedResources, resource)
		}
	}
	for resource := range newCachedResources {
		if _, exist := c.cachedResources[resource]; !exist {
			c.cachedResources[resource] = struct{}{}
		}
	}
	return nil
}

// Stop stops the cache for multi cluster.
func (c *MultiClusterCache) Stop() {
	c.lock.RLock()
	defer c.lock.RUnlock()

	for _, cache := range c.cache {
		cache.stop()
	}
}

// HasResource return whether resource is cached.
func (c *MultiClusterCache) HasResource(resource schema.GroupVersionResource) bool {
	c.lock.RLock()
	defer c.lock.RUnlock()
	_, ok := c.cachedResources[resource]
	return ok
}

// GetResourceFromCache returns which cluster the resource belong to.
func (c *MultiClusterCache) GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) {
	obj, cluster, _, err := c.getResourceFromCache(ctx, gvr, namespace, name)
	return obj, cluster, err
}

// Get returns the target object
func (c *MultiClusterCache) Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) {
	_, clusterName, cache, err := c.getResourceFromCache(ctx, gvr, request.NamespaceValue(ctx), name)
	if err != nil {
		return nil, err
	}
	obj, err := cache.Get(ctx, name, options)
	if err != nil {
		return nil, err
	}

	cloneObj := obj.DeepCopyObject()
	accessor, err := meta.Accessor(cloneObj)
	if err != nil {
		return nil, err
	}

	mrv := newMultiClusterResourceVersionWithCapacity(1)
	mrv.set(clusterName, accessor.GetResourceVersion())
	accessor.SetResourceVersion(mrv.String())

	addCacheSourceAnnotation(cloneObj, clusterName)

	return cloneObj, err
}

// List returns the object list
// nolint:gocyclo
func (c *MultiClusterCache) List(ctx context.Context, gvr schema.GroupVersionResource, o *metainternalversion.ListOptions) (runtime.Object, error) {
	klog.V(5).Infof("Request list %v with rv=%#v, continue=%#v, limit=%v", gvr.String(),
		newMultiClusterResourceVersionFromString(o.ResourceVersion), newMultiClusterContinueFromString(o.Continue), o.Limit)
	requestCluster, options, requestResourceVersion := prepareBeforeList(o)

	var resultObject runtime.Object
	items := make([]runtime.Object, 0, int(math.Min(float64(options.Limit), 1024)))

	clusters := c.getClusterNames()
	sort.Strings(clusters)
	responseResourceVersion := requestResourceVersion.clone()
	responseContinue := multiClusterContinue{}

	trace := utiltrace.New("MultiClusterCache.List",
		utiltrace.Field{Key: "gvr", Value: gvr},
		utiltrace.Field{Key: "rv", Value: newMultiClusterResourceVersionFromString(o.ResourceVersion)},
		utiltrace.Field{Key: "continue", Value: newMultiClusterContinueFromString(o.Continue)},
		utiltrace.Field{Key: "limit", Value: o.Limit},
	)
	defer trace.LogIfLong(5 * time.Second)

	defer func() {
		klog.V(5).Infof("Response list %v with rv=%#v continue=%#v", gvr.String(), responseResourceVersion, responseContinue)
	}()

	listFunc := func(cluster string) (int, string, error) {
		if requestCluster != "" && requestCluster != cluster {
			return 0, "", nil
		}
		// clear the requestContinue, for searching other clusters at next list
		defer func() {
			requestCluster = ""
			options.Continue = ""
		}()

		cache := c.cacheForClusterResource(cluster, gvr)
		if cache == nil {
			klog.V(4).Infof("cluster %v does not cache resource %v", cluster, gvr.String())
			return 0, "", nil
		}

		if options.Continue != "" {
			// specifying resource version is not allowed when using continue
			options.ResourceVersion = ""
		} else {
			options.ResourceVersion = requestResourceVersion.get(cluster)
		}
		defer trace.Step("list from member cluster",
			utiltrace.Field{Key: "cluster", Value: cluster},
			utiltrace.Field{Key: "options", Value: fmt.Sprintf("%#v", options)},
		)
		obj, err := cache.List(ctx, options)
		if err != nil {
			return 0, "", err
		}

		list, err := meta.ListAccessor(obj)
		if err != nil {
			return 0, "", err
		}

		if resultObject == nil {
			resultObject = obj
		}

		cnt := 0
		err = meta.EachListItem(obj, func(o runtime.Object) error {
			clone := o.DeepCopyObject()
			addCacheSourceAnnotation(clone, cluster)
			items = append(items, clone)
			cnt++
			return nil
		})
		if err != nil {
			return 0, "", err
		}

		responseResourceVersion.set(cluster, list.GetResourceVersion())
		return cnt, list.GetContinue(), nil
	}

	if options.Limit == 0 {
		for _, cluster := range clusters {
			_, _, err := listFunc(cluster)
			if err != nil {
				return nil, err
			}
		}
	} else {
		for clusterIdx, cluster := range clusters {
			n, cont, err := listFunc(cluster)
			if err != nil {
				return nil, err
			}

			options.Limit -= int64(n)

			if options.Limit <= 0 {
				if cont != "" {
					// Current cluster has remaining items, return this cluster name and continue for next list.
					responseContinue.Cluster = cluster
					responseContinue.Continue = cont
				} else if (clusterIdx + 1) < len(clusters) {
					// Current cluster has no remaining items. But we don't know whether next cluster has.
					// So return the next cluster name for continue listing.
					responseContinue.Cluster = clusters[clusterIdx+1]
				}
				// No more items remains. Break the chuck list.
				break
			}
		}
	}

	if err := c.fillMissingClusterResourceVersion(ctx, responseResourceVersion, clusters, gvr); err != nil {
		return nil, err
	}
	responseContinue.RV = responseResourceVersion.String()

	if resultObject == nil {
		resultObject = &metav1.List{
			TypeMeta: metav1.TypeMeta{
				APIVersion: "v1",
				Kind:       "List",
			},
			ListMeta: metav1.ListMeta{},
			Items:    []runtime.RawExtension{},
		}
	}

	err := meta.SetList(resultObject, items)
	if err != nil {
		return nil, err
	}

	accessor, err := meta.ListAccessor(resultObject)
	if err != nil {
		return nil, err
	}
	accessor.SetResourceVersion(responseResourceVersion.String())
	accessor.SetContinue(responseContinue.String())
	return resultObject, nil
}

// Watch watches the resource
func (c *MultiClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) {
	klog.V(5).Infof("Request watch %v with rv=%v", gvr.String(), options.ResourceVersion)
	resourceVersion := newMultiClusterResourceVersionFromString(options.ResourceVersion)

	responseResourceVersion := newMultiClusterResourceVersionFromString(options.ResourceVersion)
	responseResourceVersionLock := sync.Mutex{}
	setObjectResourceVersionFunc := func(cluster string, obj runtime.Object) {
		accessor, err := meta.Accessor(obj)
		if err != nil {
			return
		}

		responseResourceVersionLock.Lock()
		defer responseResourceVersionLock.Unlock()

		responseResourceVersion.set(cluster, accessor.GetResourceVersion())
		accessor.SetResourceVersion(responseResourceVersion.String())
	}

	mux := newWatchMux()
	clusters := c.getClusterNames()
	for i := range clusters {
		cluster := clusters[i]
		options.ResourceVersion = resourceVersion.get(cluster)
		cache := c.cacheForClusterResource(cluster, gvr)
		if cache == nil {
			continue
		}
		w, err := cache.Watch(ctx, options)
		if err != nil {
			return nil, err
		}

		mux.AddSource(w, func(e watch.Event) {
			setObjectResourceVersionFunc(cluster, e.Object)
			addCacheSourceAnnotation(e.Object, cluster)
		})
	}
	mux.Start()
	return mux, nil
}

func (c *MultiClusterCache) getClusterNames() []string {
	c.lock.RLock()
	defer c.lock.RUnlock()

	clusters := make([]string, 0, len(c.cache))
	for c := range c.cache {
		clusters = append(clusters, c)
	}
	return clusters
}

func (c *MultiClusterCache) cacheForClusterResource(cluster string, gvr schema.GroupVersionResource) *resourceCache {
	c.lock.RLock()
	cc := c.cache[cluster]
	c.lock.RUnlock()
	if cc == nil {
		return nil
	}
	return cc.cacheForResource(gvr)
}

func (c *MultiClusterCache) getResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, *resourceCache, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()

	var findObjects []runtime.Object
	var findCaches []*resourceCache

	// Set ResourceVersion=0 to get resource from cache.
	options := &metav1.GetOptions{ResourceVersion: "0"}
	for clusterName, cc := range c.cache {
		cache := cc.cache[gvr]
		if cache == nil {
			// This cluster doesn't cache this resource
			continue
		}

		obj, err := cache.Get(request.WithNamespace(ctx, namespace), name, options)
		if apierrors.IsNotFound(err) {
			continue
		}
		if err != nil {
			return nil, "", nil, fmt.Errorf("fail to get %v %v from %v: %v", namespace, name, clusterName, err)
		}
		findObjects = append(findObjects, obj)
		findCaches = append(findCaches, cache)
	}

	if len(findObjects) == 0 {
		return nil, "", nil, apierrors.NewNotFound(gvr.GroupResource(), name)
	}

	if len(findObjects) > 1 {
		clusterNames := make([]string, 0, len(findCaches))
		for _, cache := range findCaches {
			clusterNames = append(clusterNames, cache.clusterName)
		}
		return nil, "", nil, apierrors.NewConflict(gvr.GroupResource(), name, fmt.Errorf("ambiguous objects in clusters [%v]", strings.Join(clusterNames, ", ")))
	}
	return findObjects[0], findCaches[0].clusterName, findCaches[0], nil
}

func (c *MultiClusterCache) clientForClusterFunc(cluster string) func() (dynamic.Interface, error) {
	return func() (dynamic.Interface, error) {
		return c.newClientFunc(cluster)
	}
}

func (c *MultiClusterCache) fillMissingClusterResourceVersion(ctx context.Context, mcv *multiClusterResourceVersion, clusters []string, gvr schema.GroupVersionResource) error {
	errChan := make(chan error)
	var lock sync.Mutex
	var wg sync.WaitGroup

	for _, cluster := range clusters {
		if _, ok := mcv.rvs[cluster]; ok {
			continue
		}

		wg.Add(1)
		go func(cluster string) {
			defer wg.Done()
			klog.V(5).Infof("fillMissingClusterResourceVersion gvr=%v cluster=%v", gvr, cluster)
			rv, err := c.getClusterResourceVersion(ctx, cluster, gvr)
			if err != nil {
				errChan <- err
				return
			}
			if rv == "" {
				return
			}

			lock.Lock()
			defer lock.Unlock()
			mcv.set(cluster, rv)
		}(cluster)
	}

	waitChan := make(chan struct{}, 1)
	go func() {
		wg.Wait()
		waitChan <- struct{}{}
	}()

	var err error
	select {
	case <-waitChan:
	case err = <-errChan:
	}
	return err
}

func (c *MultiClusterCache) getClusterResourceVersion(ctx context.Context, cluster string, gvr schema.GroupVersionResource) (string, error) {
	cache := c.cacheForClusterResource(cluster, gvr)
	if cache == nil {
		klog.V(4).Infof("cluster %v does not cache resource %v", cluster, gvr.String())
		return "", nil
	}
	obj, err := cache.List(ctx, &metainternalversion.ListOptions{
		Limit: 1,
	})
	if err != nil {
		return "", err
	}
	listObj, err := meta.ListAccessor(obj)
	if err != nil {
		return "", err
	}
	return listObj.GetResourceVersion(), nil
}

// Inputs and outputs:
// o.ResourceVersion  o.Continue                     | cluster options.ResourceVersion options.Continue mrv
// xxxx               ""                             | ""      xxxx                    ""               decode(xxx)
// ""               {rv=xxx,cluster=c2,continue=}    | c2      decode(xxx)[c2]         ""               decode(xxx)
// ""               {rv=xxx,cluster=c2,continue=yyy} | c2      ""                      yyy              decode(xxx)
func prepareBeforeList(o *metainternalversion.ListOptions) (cluster string, options *metainternalversion.ListOptions, mrv *multiClusterResourceVersion) {
	options = o.DeepCopy()
	if o.Continue == "" {
		mrv = newMultiClusterResourceVersionFromString(o.ResourceVersion)
		return
	}

	mc := newMultiClusterContinueFromString(o.Continue)
	cluster = mc.Cluster
	mrv = newMultiClusterResourceVersionFromString(mc.RV)
	if mc.Continue == "" {
		options.ResourceVersion = mrv.get(cluster)
		options.Continue = ""
	} else {
		options.ResourceVersion = ""
		options.Continue = mc.Continue
	}
	return
}
